跳到主要内容

Spring 整合 ElasticSearch

补充知识 Elasticsearch 与 MySQL 数据同步的方法,具体方案选型参考:Elasticsearch 存储设计与MySQL数据同步方案 这个 Spring Data ES 的使用参考自 通过Spring Data Elasticsearch操作ES

总之现在依赖的工具:

  • SpringBoot
  • Elasticsearch
  • MySQL
  • 阿里的 Canal

Spring 的 ES 客户端们

<!-- https://mvnrepository.com/arti ... /rest -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>rest</artifactId>
</dependency>

<!-- https://mvnrepository.com/arti ... earch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>

<!-- https://mvnrepository.com/arti ... lient -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
</dependency>

首先:Spring 简单整合 ES

首先来看下 SpringBoot 如何整合 ES,首先修改下 ES 的配置文件,配置一下跨域

http.cors.enabled: true
http.cors.allow-origin: "*"

然后 Docker 重启下 ES

Spring Boot 引入 ES,注意,得先检查下版本,不清楚使用什么版本参考 这个网站

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

配置 application.yml:

spring:
application:
name: map-service-search
elasticsearch:
rest:
uris: http://localhost:19200
password:
username:
read-timeout: 1m
connection-timeout: 1s

注意:也可以不配这个,自己手动创建 Bean 配置(具体看原文)

配置好后测试创建一个 index

@SpringBootApplication
public class SearchApplication {
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class, args);
}

/**
* 创建一个 index(对应关系型数据库的库)
*/
@Bean
public boolean createTestIndex(RestHighLevelClient restHighLevelClient) throws Exception {
try {
// 如果存在则先删除 "hello-world" 如果不存在会抛出错误,这里不处理就行了
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("hello-world");
restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); // 1
} catch (Exception ignored) {
}

// 导入的是 org.elasticsearch.client.indices.CreateIndexRequest
CreateIndexRequest createIndexRequest = new CreateIndexRequest("hello-world");

createIndexRequest.settings(
Settings.builder()
.put("index.number_of_shards", 1) // 分片数(默认 5)
.put("index.number_of_replicas", 0)); // 备份分片数目(默认 1)

restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); // 2
return true;
}
}

检查是否创建成功

http://localhost:19200/_cat/indices?v

可以看到已经注册成功了

操作 ES 增删改查操作

创建自定义 Doc 实体类

要把数据放到 es 里,那么其中的索引那些就是通过自定义 doc 类来配置的(这个 doc 类里面就包含了映射规则,索引库,分词和不分词)。类似于前台网站要进行查询、排序展示的数据就是通过这个规则来执行的。

package com.alsritter.service.search.model;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

// 如果是单机版本的一台服务器上的 es,那么 创建的index中 replicas 必须为0,
// 因为es的分片副本不可和原分片在同一个节点上(即同一台服务或同一个虚拟操作系统中)
@Data
@Document(indexName = "topics" , shards = 2, replicas = 0)
public class TopicDoc {
/**
* topicId
*/
@Id
@Field(type = FieldType.Long)
private Long topicId;
/**
* 标签
*/
private String tag;
/**
* 文章标题 要分词 FieldType.Text
*/
@Field(type = FieldType.Text,analyzer = "ik_max_word",searchAnalyzer = "ik_max_word")
private String title;
/**
* 内容
*/
@Field(type = FieldType.Text,analyzer = "ik_max_word",searchAnalyzer = "ik_max_word")
private String content;
}

创建 DocRepository 类型

这个有点像 MyBatisPlus 中的 BaseMapper,就是提供一些默认的增删改操作

package com.alsritter.service.search.repository;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import com.alsritter.service.search.model.TopicDoc;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.scheduling.annotation.Async;

import java.util.concurrent.Future;

/**
* 注意这里是接口
* ElasticsearchRepository<要映射的类型,主键类型>
* 这个接口里面有基础的 crud,所以需要继承这个接口
*
* @author alsritter
* @version 1.0
**/
public interface TopicDocRepository extends ElasticsearchRepository<TopicDoc, Long> {

/**
* 根据标题返回消息列表,这里的 Pageable 工具类除了可以支持分页,也支持了排序
*/
Page<TopicDoc> findByTitle(String title, Pageable pageable);

/**
* 通过 Future 异步获取数据,Top 就是我们取数据集的第一条
*/
@Async
Future<TopicDoc> findTopByTag(String tag, Sort sort);

/**
* 复合查询条件,必须同时满足
* {“bool” : {“must” : [ {“field” : {“title” : “?”}}, {“field” : {“tag” : “?”}} ]}}
*/
Page<TopicDoc> findByTitleAndTag(String title, String tag, Pageable pageable);

/**
* 复合查询条件,满足其中之一
* {“bool” : {“should” : [ {“field” : {“title” : “?”}}, {“field” : {“tag” : “?”}} ]}}
*/
Page<TopicDoc> findByTitleOrTag(String title, String tag, Pageable pageable);
}

编写测试类

建立测试类

@SpringBootTest
class SearchApplicationTest {
@Autowired
private TopicDocRepository topicDocRepository;

// 清空数据并验证数据条数是否已为0
@Order(0)
@Test
@DisplayName("清空所有数据")
void clearIndex() {
topicDocRepository.deleteAll();
assertTrue(0 == topicDocRepository.count(), "数据已清空完毕");
}

/**
* 这个 @CsvSource 是参数化测试,一般用来测试多参数的数据
* 具体参考:https://doczhcn.gitbook.io/junit5/index/index-2/parameterized-tests
* 简单阅读:JUnit5学习之六:参数化测试(Parameterized Tests)基础
* https://www.cnblogs.com/bolingcavalry/p/14454696.html
*/
@ParameterizedTest
@Order(1)
@CsvSource({
"How's the weather today?, issue, Today I came back to my hometown. It was a sunny and windy day.",
"How many people are there now?, issue, There should be tens of thousands of people inside.",
})
@DisplayName("初始化数据")
void initTest(String title, String tag, String content) {
// 每插入一条数据记录数加一
// String uuid = UUID.randomUUID().toString();
Long id = RandomUtil.randomLong(10000);
Instant time = Instant.now();
TopicDoc topic = new TopicDoc(Long.valueOf(id), tag, title, content, time);
// 保存一条新的 doc 并获取返回数据
TopicDoc doc = topicDocRepository.save(topic);
// 对比存入后返回的数据与传入参数的 title 是否一致
assertEquals(topic.getTitle(), doc.getTitle(), "Title 一致");
}
}

点进 Kibana 控制台就可以看到刚刚插入的数据了

这个 Kibana 用法参考:https://www.cnblogs.com/chenqionghe/p/12503181.html

配置响应式的方式

补充个反应式的

package com.alsritter.service.search.repository;

import com.alsritter.service.search.model.TopicDoc;
import org.springframework.data.domain.Pageable;
import org.springframework.data.repository.reactive.ReactiveSortingRepository;
import reactor.core.publisher.Flux;

/**
* 反应式编程
* 通过 Reactive 进行 TopicDoc 操作
* 返回类型为 Flux<T> 或 Mono<T>
* ReactiveSortingRepository 继承了 ReactiveCrudRepository,所以我们直接继承 ReactiveSortingRepository
* 在写法上除了返回类型不同,其它与 TopicDocRepository 类似
*
* @author alsritter
* @version 1.0
**/
public interface ReactiveTopicDocRepository extends ReactiveSortingRepository<TopicDoc, Long> {
Flux<TopicDoc> findByTag(String tag, Pageable pageable);
}

注意,使用响应式编程,需要先导入依赖:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

然后配置它的 Bean

/**
* 方法配置了 ES Data 的响应式客户端,并通过注解启用 ReactiveElasticsearchRepositories
*
* @author alsritter
* @version 1.0
**/
@Configuration
@EnableReactiveElasticsearchRepositories
public class ReactiveElasticsearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("localhost:19200")
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}

测试增删改查

增删改查测试:

package com.alsritter.service.search;

import cn.hutool.core.util.RandomUtil;
import com.alsritter.service.search.model.TopicDoc;
import com.alsritter.service.search.repository.ReactiveTopicDocRepository;
import com.alsritter.service.search.repository.TopicDocRepository;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import reactor.core.publisher.Flux;

import java.time.Instant;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class SearchApplicationTest {
@Autowired
private TopicDocRepository topicDocRepository;
@Autowired
private ReactiveTopicDocRepository reactiveTopicDocRepository;

// 清空数据并验证数据条数是否已为0
@Order(0)
@Test
@DisplayName("清空所有数据")
void clearIndex() {
topicDocRepository.deleteAll();
assertTrue(0 == topicDocRepository.count(), "数据已清空完毕");
}

/**
* 这个 @CsvSource 是参数化测试,一般用来测试多参数的数据
* 具体参考:https://doczhcn.gitbook.io/junit5/index/index-2/parameterized-tests
* 简单阅读:JUnit5学习之六:参数化测试(Parameterized Tests)基础 https://www.cnblogs.com/bolingcavalry/p/14454696.html
*/
@ParameterizedTest
@Order(1)
@CsvSource({
"How's the weather today?, issue, Today I came back to my hometown. It was a sunny and windy day.",
"How many people are there now?, issue, There should be tens of thousands of people inside.",
})
@DisplayName("初始化数据")
void initTest(String title, String tag, String content) {
// 每插入一条数据记录数加一
// String uuid = UUID.randomUUID().toString();
Long id = RandomUtil.randomLong(10000);
Instant time = Instant.now();
TopicDoc topic = new TopicDoc(Long.valueOf(id), tag, title, content, time);
// 保存一条新的 doc 并获取返回数据
TopicDoc doc = topicDocRepository.save(topic);
// 对比存入后返回的数据与传入参数的 title 是否一致
assertEquals(topic.getTitle(), doc.getTitle(), "Title 一致");
}


@DisplayName("获取消息条数")
@Order(2)
@Test
void getTopicQuantity() {
// 通过响应式方式获取所有消息
Flux<TopicDoc> msg = reactiveTopicDocRepository.findAll();
// 通过一般方式获取消息条数
long msgCount = topicDocRepository.count();
System.out.println("共有消息,msg.count() : " + msg.count().block().longValue());
System.out.println("共有消息,msgCount : " + msgCount);
// 如果数据量特别多这个断言是会失败的,因为findAll如果不加分页,则其会默认最多1000条
assertTrue(
(msg.count().block().longValue() == msgCount), "消息条数符合预期");
}


private static Long tstId = 1234567L;


@DisplayName("保存新的主题")
@Order(3)
@Test
void saveTopic() {
TopicDoc topic =
new TopicDoc(
tstId,
"work",
"沁园春·辛丑春重返达坂城",
"放眼乾坤,漫步山川,回首汉唐。想旌旗西指,乌孙路远;管弦东去,赤子情长。营帐生烟,轮蹄迸火,于此都曾系马缰。登临处,问悠悠岁月,几度沧桑。 且随雁阵翱翔,把城外城中细打量。对蓝天白日,车飞高速;青杨红柳,人在仙乡。雪浪清心,冰峰爽目,铺绿良田扩四方。吾来也,用吟诗摄影,留住春光。",
Instant.now());
TopicDoc doc = topicDocRepository.save(topic);
assertAll(
"msg",
() -> assertEquals(topic.getTitle(), doc.getTitle()),
() -> assertEquals(topic.getTopicId(), doc.getTopicId()));
}


@DisplayName("更新主题")
@Order(4)
@Test
void updateTopic() {
Optional<TopicDoc> optionalTopicDoc = topicDocRepository.findById(tstId);

TopicDoc doc = optionalTopicDoc.get();
String title = doc.getTitle();
String newTitle = "沁园春·修改后的标题";
doc.setTitle(newTitle);
TopicDoc savedMsg = topicDocRepository.save(doc);
assertAll(
"msg",
() -> assertEquals(savedMsg.getTitle(), newTitle),
() -> assertNotEquals(savedMsg.getTitle(), title),
() -> assertEquals(savedMsg.getTopicId(), tstId));
}


@DisplayName("删除主题")
@Order(5)
@Test
void delTopic() {
// 看看该消息是否存在
boolean existTopicBeforeDel = topicDocRepository.existsById(tstId);
// 删除该消息
topicDocRepository.deleteById(tstId);
// 看看该消息是否还存在
boolean existTopicAfterDel = topicDocRepository.existsById(tstId);
assertAll("exist topic", () -> assertTrue(existTopicBeforeDel), () -> assertFalse(existTopicAfterDel));
}


@DisplayName("获取运营发的消息")
@Order(7)
@Test
void getTopicDocFromTag() {
String tag = "issue";
Flux<TopicDoc> topics = reactiveTopicDocRepository.findByTag(tag, PageRequest.of(0, 2, Sort.by("topicId").descending()));
System.out.println("消息共有: " + topics.count().block().longValue());
topics.toStream()
.forEach(
topic -> {
System.out.println("内容是: " + topic.getContent());
System.out.println("标题是: " + topic.getTitle());
});
}
}

什么是 Canal

参考资料 开源数据同步神器——canal

Canal 是阿里的 一个数据同步工具,canal 通过模拟成为 mysql 的 slave 的方式,监听 mysql 的 binlog 日志来获取数据

怎么安装 Canal

Reference

how-to-connect-java-with-elasticsearch Spring Data Elasticsearch Example ElasticSearch 7.x with Springboot 2.3.x - 让ES在我们的程序中起飞